Skip to content

Conversation

spiridonov
Copy link
Contributor

@spiridonov spiridonov commented Oct 7, 2025

What this PR does / why we need it:

In order to add support for rate() aggregation function, which will be implemented as count_over_time/$interval, I had to add support for math expressions. Binary expressions with only single input are supported for now. Things like sum_over_time/count_over_time will be implemented later. There is an optimization to merge several math expression nodes into one in the physical plan.

Minor:

  • fixed some typos

Which issue(s) this PR fixes:
Fixes #

Special notes for your reviewer:

Diff for pkg/engine/internal/planner/logical/planner.go is ugly here, it is better to view the new file as the whole to understand it better. I basically split one large function into 3 pieces without changing much in that logic.

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

@spiridonov spiridonov requested a review from a team as a code owner October 7, 2025 18:43
@spiridonov spiridonov enabled auto-merge (squash) October 7, 2025 18:44
@spiridonov spiridonov disabled auto-merge October 7, 2025 21:07
@spiridonov spiridonov marked this pull request as draft October 8, 2025 18:44
@spiridonov spiridonov marked this pull request as ready for review October 9, 2025 16:09
@spiridonov spiridonov enabled auto-merge (squash) October 9, 2025 16:09
Copy link
Contributor

@chaudum chaudum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would appreciate if you could split the PR into the binop expression implementation and the rate() implementation.

@spiridonov spiridonov changed the title feat(engine): rate aggregation and basic math expressions feat(engine): basic math expressions Oct 14, 2025
@spiridonov
Copy link
Contributor Author

@chaudum I removed rate() changes from this PR, now it is just math expressions. PTAL.

Comment on lines 251 to 263
var vecAggType types.VectorAggregationType
switch e.Operation {
//case syntax.OpTypeCount:
// vecAggType = types.VectorAggregationTypeCount
case syntax.OpTypeSum:
vecAggType = types.VectorAggregationTypeSum
//case syntax.OpTypeMax:
// vecAggType = types.VectorAggregationTypeMax
//case syntax.OpTypeMin:
// vecAggType = types.VectorAggregationTypeMin
default:
return nil, errUnimplemented
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that you did not change anything here, but could we follow the pattern that you introduced and move this into a separate function convertVectorAggType(op string) types.VectorAggregationType?

Same for the RangeAggregationType

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

end: 7200,
interval: 5 * time.Minute,
}
t.Run(`sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"} |= "metric.go"[5m]))`, func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make name descriptive

Suggested change
t.Run(`sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"} |= "metric.go"[5m]))`, func(t *testing.T) {
t.Run("simple metric query", func(t *testing.T) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

t.Logf("\n%s\n", sb.String())
})

t.Run(`sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"}[5m]) / 300)`, func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make name descriptive

Suggested change
t.Run(`sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"}[5m]) / 300)`, func(t *testing.T) {
t.Run("binop metric query", func(t *testing.T) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

%7 = LT builtin.timestamp 1970-01-01T02:00:00Z
%8 = SELECT %6 [predicate=%7]
%9 = RANGE_AGGREGATION %8 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%10 = DIV %9 300
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder whether we should change the string representation of a literal to something like

Suggested change
%10 = DIV %9 300
%10 = DIV %9 LITERAL(300)

or

Suggested change
%10 = DIV %9 300
%10 = DIV %9 INT64(300)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, overall. But I would make that change in pkg/engine/internal/types/literal.go for each literal type, and that will cause a lot of diffs in all tests where literals are used (mostly in predicates). This is too much for this PR and kinda unrelated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's just a small thing regarding representation. Does not need to be addressed in this PR

Comment on lines 53 to 55
var SupportedVectorAggregationTypes = []VectorAggregationType{
VectorAggregationTypeSum, VectorAggregationTypeMax, VectorAggregationTypeMin, VectorAggregationTypeCount,
VectorAggregationTypeSum,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this anywhere used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not anymore. fixed. also removed SupportedRangeAggregationTypes because now the name is confusing with its only usage.

Comment on lines 30 to 33
fields = append(fields, arrow.Field{
Name: fmt.Sprintf("float64.generated.input_%d", i),
Type: types.Arrow.Float,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: You can use the semconv package to generate the field definition, either by using semconv.FieldFromFQN() or semconv.FieldFromIdent()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

func NewMathExpressionPipeline(expr *physical.MathExpression, inputs []Pipeline, evaluator expressionEvaluator) *GenericPipeline {
return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) (arrow.Record, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think your implementation of the pipeline is correct.

What this pipeline is doing is returning a new arrow.Record with fields float64.generated.input_0 and timestamp_ns.builtin.timestamp, instead of preserving the existing column names.

While this works for a specific subset of queries that have a vector aggregation without grouping, it does not work when there is a vector aggregation with grouping, e.g. sum by (cluster, namespace) (count_over_time({env="prod"} [1m])) / 100).

Image Image

So instead of generating a new schema, you "only" need to apply the math function on the float64.generated.value (semconv.ColumnIdentValue) column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pipeline takes its inputs (just 1 for now, but multiple in general) and makes input_0, input_1 etc columns to evaluate the given math expression. The math expression is in the form of input_0 / 90 * 199 + input_1 regardless of what exactly those input pipelines are. After evaluation is done this pipeline returns two columns value and ts. This is a bug that I miss other columns (for groupings) in the output, and I will fix this. But it does not return input_0 in any case.

Comment on lines +229 to +231
type genericFloat64Function[E arrayType[T], T comparable] struct {
eval func(a, b T) (float64, error)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be expressed as a generic function

Suggested change
type genericFloat64Function[E arrayType[T], T comparable] struct {
eval func(a, b T) (float64, error)
}
type genericFunction[E arrayType[T], T comparable, R any] struct {
eval func(a, b T) (R, error)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not found a clean way to do that in a generic way. There are some pretty specific type coercions based on the result type, plus pow and mod are implemented totally differently.

Comment on lines +239 to +242
lhsArr, ok := lhs.ToArray().(E)
if !ok {
return nil, arrow.ErrType
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array needs to be released

Suggested change
lhsArr, ok := lhs.ToArray().(E)
if !ok {
return nil, arrow.ErrType
}
lhsArr, ok := lhs.ToArray().(E)
if !ok {
return nil, arrow.ErrType
}
defer lhsArr.Release()

Same for rhsArr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I basically reverted #19496 and not it is not needed anymore.

defer valCol.Release()

schema := batch.Schema()
valueCol := semconv.NewIdentifier(types.ColumnNameGeneratedValue, types.ColumnTypeGenerated, types.Loki.Float)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of valueCol you can use semconv.ColumnIdentValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 29 to 32
fields := make([]arrow.Field, 0, len(inputs))
for i := range inputs {
fields = append(fields, semconv.FieldFromFQN(fmt.Sprintf("float64.generated.input_%d", i), false))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit confusing that for the fields you iterate over the inputs, but for the cols you do an explicit single append().

I think it would be clearer if you also only append a single field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 27 to 30
schema := arrow.NewSchema([]arrow.Field{
semconv.FieldFromFQN(colTs, false),
semconv.FieldFromFQN(colVal, false),
}, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you extend the test so it also has additional (grouping) columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added two more labels into the test

@rfratto
Copy link
Member

rfratto commented Oct 16, 2025

I'm having a hard time following the input_N concept. Conventionally, other nodes use an expression tree (the LHS of a BinOp can be another BinOp), but I see that in processBinOp, both side of the binary operation are only ever a literal or a column reference (which I believe is the reason for the the input_N concept). Is there a reason we have to do it this way?

Also: it seems like MathExpression could be represented as a projection. Do we need MathExpression as a distinct concept?

@spiridonov
Copy link
Contributor Author

If an expression is, for example, A + B, both of them come from some subqueries (and from different inputs children of the given pipeline), so both A and B will have a value column in them. This code takes input pipelines in the correct order, extracts their value columns and puts them into evalInput under input_N names. This way I can run eval() with the expression (unchanged) and multiple inputs. This might be confusing, I agree, but I did not find another way to run eval() on multiple input pipelines with less magic.

I think there should be a separate concept for math expressions, especially when more complex cases are gonna be implemented (such as sum_over_time/count_over_time) that require reading both children pipelines and aligning them by timestamps before feeding into eval(). This should not belong to a projection.

cc @rfratto

@spiridonov spiridonov requested a review from rfratto October 16, 2025 20:08
@rfratto
Copy link
Member

rfratto commented Oct 16, 2025

Thanks, that makes sense if you're trying to prepare for supporting math over two vectors.

I suspect computations over two vectors (in a way that's compatible with LogQL/PromQL) is going to be trickier than it seems. I believe, in relational algebra terms, they're expressed as a combination of an inner join and a projection.

So, given a query like

(
  sum by (job) (rate({namespace="dev"}[$__auto])
) + (
  sum by (job) (rate({namespace="qa"}[$__auto])
)

A physical plan mapping literally to the algebra could be

Projection expressions=[
  left.timestamp as timestamp,
  left.job AS job, 
  left.value + right.value as value,
] 
  InnerJoin left_prefix="left" right_prefix="right" on=[
    left.timestamp = right.timestamp && left.job = right.job 
  ] 
    VectorAggregate op=sum groupings=[job] # left-hand side 
      RangeAggregate op=rate  
        DataObjScan 
    VectorAggregate op=sum groupings=[job] # right-hand side 
      RangeAggregate op=rate 
        DataObjScan 

This needs to explicitly be an inner join, since LogQL's metric queries require the sample to exist on both side of the expression (this matches PromQL's behaviour).

Example

For example, the two inputs of OuterJoin

ts job value
0 loki 5
10 mimir 10

and

ts job value
0 loki 15
10 tempo 10

is joined into

left.ts left.job left.value right.ts right.job right.value
0 loki 5 0 loki 15

and is projected to

unix_ts job value
0 loki 20

I think it's probably okay if we wanted to have a node which combines the work of projection and inner joins, though I do think it's possible to represent these operations using projections, which we will have a separate node for, and separating them out may be easier to understand in the plans.

All that said, I do wonder if math on two vectors is going to require a lot more thought. Would we be able to simplify the logic here if we descoped that from our consideration?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants